Last updated on 9 months ago
test
预备知识 Unix domain socket 和 网络 socket的区别 对于网络socket,大家应该比较熟悉,两个不同主机上的进程通信,就可以网络 socket来通信;对于同一台主机上的两个进程通信也可以使用socket,地址使用 127.0.0.1就可以实现,但是对于同一主机的两个进程通信而言,其数据还是需要通过网络协议栈(数据需要打包又要拆包…),这样效率并不高,后来引用 Unix domain socket(网络socket 的框架上发展出一种 IPC 机制) ,专门用于实现同一主机上的进程间通信 ,Unix domain socket 不需要经过网络协议栈,可以直接将数据从一个进程拷贝到另一个进程;
网络socket 使用ip地址加端口号确定 socket 地址,而 Unix domain socket 的地址是一个 socket 类型的文件路径, 这也是两者最大的区别! 其余使用基本相同。
admin socket 是什么? Ceph 集群 中有很多守护进程,如每个osd都有一个守护进程,如果我们想获取进程运行时的配置参数我们可以使用 admin socket 查看,还可以获取进程运行的状态,以及集群修改配置,获取log等。
admin socket 获取进程信息的方式通过 Unix domain socket ,即我们上文提到的 IPC机制(inter-process communication 进程间通信),admin socket 初始化时候会生成一个 socket 类型的文件(调用 bind函数的时候),其文件路径 固定在了 /var/run/ceph/ 中,(这个文件可以理解为通信地址);每次使用 adminsocket 获取进程状态的时候,都需要附带这个文件路径(下面使用会提到)。
admin socket 实现流程图
admin socket 怎么用? 简单使用 ceph daemon [socket 类型的文件路径] [command]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 [root@node29 ~] ceph-client.radosgw.gateway.28290.93853901817120.asok ceph-mon.mon-node29.asok ceph-osd.1.asok ceph-mgr.node29.asok ceph-osd.0.asok ceph-osd.2.asok [root@node29 run] no valid command found; 7 closest matches: config set <var> <val> [<val>...] config help {<var>} config diff config get <var> config diff get <var> config show config unset <var> [root@node29 run] { "name" : "osd.0" , "cluster" : "ceph" , "admin_socket" : "/var/run/ceph/ceph-osd.0.asok" , "admin_socket_mode" : "" , "auth_client_required" : "cephx" , "auth_cluster_required" : "cephx" , ...... } [root@node29 run] { "admin_socket" : "/var/run/ceph/ceph-osd.0.asok" } [root@node29 run] [root@node29 ceph] { "success" : "" } ...... [root@node29 ~] { "AsyncMessenger::Worker-0" : { "msgr_recv_messages" : 2665895, "msgr_send_messages" : 2665927, "msgr_recv_bytes" : 853128095, "msgr_send_bytes" : 4958397491, "msgr_created_connections" : 13, "msgr_active_connections" : 2, "msgr_running_total_time" : 600.146200311, "msgr_running_send_time" : 223.453728410, "msgr_running_recv_time" : 152.217070353, "msgr_running_fast_dispatch_time" : 77.218817120 },
注意事项
ceph daemon 后面跟的文件路径一定要为绝对路径
使用 admin sokcet修改参数,不一定马上生效,还需要结合实际情况分析
admin socket启动整体流程 admin socket 大致的启动流程
admin socket 执行命令流程
AdminSocket 类在代码中 admin socket和 AdminSocket 类有关系,在一开始初始化 CephContext (ceph的上下文)时候,就已经 new 出 AdminSocket类的实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 CephContext::CephContext (uint32_t module_type_, enum code_environment_t code_env, int init_flags_){ _admin_socket = new AdminSocket (this ); _admin_hook = new CephContextHook (this ); _admin_socket->register_command ("assert" , "assert" , _admin_hook, "" ); _admin_socket->register_command ("abort" , "abort" , _admin_hook, "" ); _admin_socket->register_command ("perfcounters_dump" , "perfcounters_dump" , _admin_hook, "" ); _admin_socket->register_command ("config show" , "config show" , _admin_hook, "dump current config settings" ); }
从 register_command 的参数可以推断出 是添加命令并注册命令对应的回调函数,这里结合代码概述下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class AdminSocketHook {public : virtual bool call (std::string_view command, const cmdmap_t & cmdmap, std::string_view format, bufferlist& out) = 0 ; virtual ~AdminSocketHook () {} };int AdminSocket::register_command (std::string_view command, std::string_view cmddesc, AdminSocketHook *hook, std::string_view help) { int ret; std::unique_lock l (lock) ; auto i = hooks.find (command); if (i != hooks.cend ()) { ldout (m_cct, 5 ) << "register_command " << command << " hook " << hook << " EEXIST" << dendl; ret = -EEXIST; } else { ldout (m_cct, 5 ) << "register_command " << command << " hook " << hook << dendl; hooks.emplace_hint (i, std::piecewise_construct, std::forward_as_tuple(command), std::forward_as_tuple(hook, cmddesc, help)); ret = 0 ; } return ret; }
admin socket 启动流程 一开始就提到过 用户是通过socket来获取ceph中配置信息的,那肯定是有初始化流程,结合代码看看其启动流程
有一个函数在 ceph_osd 模块中出现了很多次 common_init_finish (出现了七次),
但其实跳转进去看,最多会执行一次,因为里面有个标志位;这个函数 主要是启动日志服务线程和 admin socket的服务线程,而多次调用是我猜是确保服务线程启动;
最终在 里看到 AdminSocket 的init函数
AdminSocket::init 函数主要是启动 AdminSocket 线程,既然是启了线程,这个线程应该是 osd 的守护进程启动的,在系统看 果然是有 admin_socket;
AdminSocket::init 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 bool AdminSocket::init (const std::string& path) { hrp ("admin socket 初始化" ) std::string err; int pipe_rd = -1 , pipe_wr = -1 ; err = create_shutdown_pipe (&pipe_rd, &pipe_wr); int sock_fd; err = bind_and_listen (path, &sock_fd); m_sock_fd = sock_fd; m_shutdown_rd_fd = pipe_rd; m_shutdown_wr_fd = pipe_wr; m_path = path; hrp (m_path); version_hook = std::make_unique <VersionHook>(); register_command ("0" , "0" , version_hook.get (), "" ); ..... th = make_named_thread ("admin_socket" , &AdminSocket::entry, this ); add_cleanup_file (m_path.c_str ()); return true ; }
从 AdminSocket::init 可以看的,主要有 三个部分组成,创建 pipe、socket,以及启动 线程
AdminSocket::create_shutdown_pipe 创建一个管道,在注释中可以了解到,线程会监听 读管道,当这有有数据的时候,会优雅 的将线程 kill 掉,其实从名字也会可以推测出 创建 shutdown 管道
1 2 3 4 5 6 7 8 9 10 11 12 13 14 std::string AdminSocket::create_shutdown_pipe (int *pipe_rd, int *pipe_wr) { int pipefd[2 ]; if (pipe_cloexec (pipefd) < 0 ) { int e = errno; ostringstream oss; oss << "AdminSocket::create_shutdown_pipe error: " << cpp_strerror (e); return oss.str (); } *pipe_rd = pipefd[0 ]; *pipe_wr = pipefd[1 ]; return "" ; }
bind_and_listen(path, &sock_fd); 函数主要工作是生成 socket,但这里的socket是单机版的socket( UNI Domian Socket ),和以前接触网络socket 不相同,不同之处如下所示(这也说明为什么ceph daemon 后面要加个 asok文件 !)
从 bind_and_listen 也可以看出确实是 用sockaddr_un 结构体,其实网络socket也是可以实现的(地址回环),那为什么用 UNI Domian Socket 呢?
当我们使用 admin socket 读取或修改ceph 配置时候,都是需要在对应节点主机上操作,假如说ceph 集群中有 A、B、C三个几点,各自部署了 osd,A节点不能直接修改 B节点的osd配置(远程过去也行…),因为 那个 asok文件并不在A节点上,现在这些操作都是在同一台主机上,相当于 IPC(进程间通信),UNIX Domain Socket是不需要 经过网络协议栈,只需要将 应用层数据从一个进程拷贝到另一个进程 ,这样效率比较高
接下来步骤就是 bind 和 listen 了,和 网络socket建立没多大区别…. 一个 socket 只能bind一次, 在 UNI Domian Socket 调用bind的时候,会根据 address 给出的路径生成 socket文件
start thread 启动线程用来监听socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 m_sock_fd = sock_fd; m_shutdown_rd_fd = pipe_rd; m_shutdown_wr_fd = pipe_wr; m_path = path; hrp (m_path); version_hook = std::make_unique <VersionHook>(); register_command ("0" , "0" , version_hook.get (), "" ); register_command ("version" , "version" , version_hook.get (), "get ceph version" ); register_command ("git_version" , "git_version" , version_hook.get (), "get git sha1" ); help_hook = std::make_unique <HelpHook>(this ); register_command ("help" , "help" , help_hook.get (), "list available commands" ); getdescs_hook = std::make_unique <GetdescsHook>(this ); register_command ("get_command_descriptions" , "get_command_descriptions" , getdescs_hook.get (), "list available commands" ); th = make_named_thread ("admin_socket" , &AdminSocket::entry, this ); add_cleanup_file (m_path.c_str ());
AdminSocket::entry admin_socket 线程运行的函数,和常规写法一样用一个 while循环,里面用 poll 来检测两个 m_sock_fd,和 m_shutdown_rd_fd(就是最初创建 pipe那个 )的事件响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void AdminSocket::entry () noexcept { ..... while (true ) { struct pollfd fds[2 ]; fds[0 ].fd = m_sock_fd; fds[0 ].events = POLLIN | POLLRDBAND; fds[1 ].fd = m_shutdown_rd_fd; fds[1 ].events = POLLIN | POLLRDBAND; int ret = poll (fds, 2 , -1 ); if (ret < 0 ) { int err = errno; if (err == EINTR) { continue ; } lderr (m_cct) << "AdminSocket: poll(2) error: '" << cpp_strerror (err) << dendl; return ; } if (fds[0 ].revents & POLLIN) { do_accept (); } if (fds[1 ].revents & POLLIN) { return ; } } ldout (m_cct, 5 ) << "entry exit" << dendl; }
AdminSocket::do_accept() 当 m_sock_fd 有时间响应时候,会执行 do_accept(); 看函数名应该是 对收到的数据做些处理
accept 返回一个 fd,这里是用while 逐个字符读取 客户端传来的命令
while里面做了些处理,结合日志,可以看到最终读出来的格式是字符串是 json 格式的
最后字符传到 execute_command 函数中
现在 知道传进来的字符串 c 是一条 json字符串, 在 execute_command 函数中,会将 json解析到map中(目的是 提取value)
接下来是从 map中获取我们需要的 命令, 假如说 c是 {“prefix”,”config show”} ,那么 match为 config show
在文章的 整体流程部分提到了 ,cct初始化时,会注册一些函数到 hooks 中 ,现在就通过 传进来的参数在 hooks中查找对应的回调,hooks也是一个map
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 std::unique_lock l (lock) ; decltype (hooks)::iterator p; while (match.size ()) { p = hooks.find (match); if (p != hooks.cend ()) break ; size_t pos = match.rfind (' ' ); if (pos == std::string::npos) { match.clear (); break ; } else { match.resize (pos); } } if (p == hooks.cend ()) { lderr (m_cct) << "AdminSocket: request '" << cmd << "' not defined" << dendl; return false ; } string args; if (match != cmd) { args = cmd.substr (match.length () + 1 ); } in_hook = true ; auto match_hook = p->second.hook; l.unlock (); bool success = (validate (match, cmdmap, out) && match_hook->call (match, cmdmap, format, out)); l.lock (); in_hook = false ; in_hook_cond.notify_all ();
执行完 注册的函数后,其输出的内容保存在 bufferlist类型的 out 中,最后写入 到新建连接的 connection_fd中
客户端 如何发起连接和读数据的 当我们输入 ceph daemon 的时候,首先会先解析命令,ceph daemon 命令首先 会在ceph.in 中的函数maybe_daemon_command 被解析:
如果命令解析正确,最后会执行 admin_socket 函数
admin_socket 函数 在 ceph_daemon.py 文件里
在 admin_socket 函数中有个关键的函数 do_sockio 用于发起连接,并发送命令:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 def do_sockio (path, cmd_bytes ): """ helper: do all the actual low-level stream I/O """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(path) try : sock.sendall(cmd_bytes + b'\0' ) len_str = sock.recv(4 ) if len (len_str) < 4 : raise RuntimeError("no data returned from admin socket" ) l, = struct.unpack(">I" , len_str) sock_ret = b'' got = 0 while got < l: want = min (l - got, READ_CHUNK_SIZE) bit = sock.recv(want) sock_ret += bit got += len (bit) except Exception as sock_e: raise RuntimeError('exception: ' + str (sock_e)) return sock_ret
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 try : cmd_json = do_sockio(asok_path, b'{"prefix": "get_command_descriptions"}' ) except Exception as e: raise RuntimeError('exception getting command descriptions: ' + str (e)) if cmd == 'get_command_descriptions' : return cmd_json sigdict = parse_json_funcsigs(cmd_json.decode('utf-8' ), 'cli' ) valid_dict = validate_command(sigdict, cmd) if not valid_dict: raise RuntimeError('invalid command' ) if format : valid_dict['format' ] = format try : ret = do_sockio(asok_path, json.dumps(valid_dict).encode('utf-8' )) except Exception as e: raise RuntimeError('exception: ' + str (e)) return ret
返回的结果最后写到终端上
执行对应的函数 以上,一个命令执行的整体流程就结束了,结合一个命令分析并做一个总结
比如说 config命令,注册时候 command 为 config show,命令描述也是,回调类是 **CephContextHook *_admin_hook**,最后一个参数信息是帮助信息
和 类重写父类的 call,最终调用的 do_comand 函数
do_comand 里面一大推 if-else 做字符串匹配
_show_config 最终调用了 _show_config 函数
从上图可以看出,有些 参数是保存到Formatter *f中,Formatter可以理解为 是一种格式化流;最后执行完回到 do_commad 函数,会将 格式化好的数据刷新到到 out 中
这里做个测试 : 我在 _show_config 中添加些内容,看下打印的结果怎样
自定义一个hook 尝试自己注册一个命令,这里回调函数还是 和show config 相同,添加了if 语句